package com.my.service.task.sink;

import com.my.service.task.entity.BrandLike;
import com.my.service.task.entity.UserTypeInfo;
import com.my.service.task.util.MongoUtils;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.bson.Document;

public class UserTypeSink implements SinkFunction<UserTypeInfo> {
    @Override
    public void invoke(UserTypeInfo value, Context context) throws Exception {
        String userType = value.getUserType();
        long count = value.getCount();
        Document res = MongoUtils.findOneBy("usertypestatics",
                "realtimeportrait",
                userType
        );
        if (res == null) {
            res = new Document();
            res.put("info", userType);
            res.put("count", count);
        } else {
            Long countPre = res.getLong("count");
            Long total = countPre + count;
            res.put("count", total);
        }
        MongoUtils.saveOrUpdateMongo(
                "usertypestatics",
                "realtimeportrait",
                res
        );
    }
}
